-
Notifications
You must be signed in to change notification settings - Fork 709
l1: add replicated database #29161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
l1: add replicated database #29161
Conversation
2a46d7c to
37e3199
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a new replicated_database abstraction that wraps an LSM database with Raft-based replication. The database is leader-only and uses object storage for data persistence while replicating its manifest through Raft for fault tolerance.
Key changes:
- Added timeout support to
lsm::database::flush()to prevent indefinite blocking - Introduced
memory_persistence_controllerfor testing failure scenarios - Implemented
replicated_databaseclass that coordinates LSM operations with Raft replication
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/lsm/lsm.h | Added optional timeout parameter to flush method signature |
| src/v/lsm/lsm.cc | Implemented timeout parameter forwarding in flush wrapper |
| src/v/lsm/io/memory_persistence.h | Added controller struct for injecting failures in tests |
| src/v/lsm/io/memory_persistence.cc | Implemented failure injection logic in memory persistence |
| src/v/lsm/db/tests/impl_test.cc | Added test for flush timeout behavior |
| src/v/lsm/db/impl.h | Updated flush signature with timeout parameter |
| src/v/lsm/db/impl.cc | Implemented timeout enforcement in flush operation |
| src/v/cloud_topics/level_one/metastore/lsm/tests/replicated_db_test.cc | Comprehensive test suite for replicated database functionality |
| src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD | Build configuration for new test |
| src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.h | Interface for Raft-replicated metadata persistence |
| src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc | Implementation of replicated metadata persistence |
| src/v/cloud_topics/level_one/metastore/lsm/replicated_db.h | Header for replicated database abstraction |
| src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc | Core implementation of replicated database operations |
| src/v/cloud_topics/level_one/metastore/lsm/BUILD | Build configuration for new libraries |
c6676df to
a69fd10
Compare
rockwotj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, a quick glance mostly lgtm. Will look more next week back at a computer
| read_manifest(lsm::internal::database_epoch max_epoch) override { | ||
| _as.check(); | ||
| auto _ = _gate.hold(); | ||
| auto term_result = co_await _stm->sync(std::chrono::seconds(30)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make all this abortable too? Maybe the io layer needs an abort source in the apis... Anyways not for this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it supposed to be invoked right after the leadership transfer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make all this abortable too?
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it supposed to be invoked right after the leadership transfer?
Yea, the expectation is that this is called via opening the database before performing any updates on the database in a given term
src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc
Outdated
Show resolved
Hide resolved
| read_manifest(lsm::internal::database_epoch max_epoch) override { | ||
| _as.check(); | ||
| auto _ = _gate.hold(); | ||
| auto term_result = co_await _stm->sync(std::chrono::seconds(30)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it supposed to be invoked right after the leadership transfer?
| cloud_io::remote* remote, | ||
| const cloud_storage_clients::bucket_name& bucket, | ||
| ss::abort_source& as) { | ||
| auto term_result = co_await s->sync(std::chrono::seconds(30)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, is it expected to be invoked right after the leadership transfer or the start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected to be called upon becoming leader before replicating any LSM updates in the given term (hence all LSM updates go through an already opened replicated_database instance)
| // Replay the writes in the volatile_buffer as writes to the database. | ||
| // These are writes that were replicated but not yet persisted to the | ||
| // manifest. | ||
| auto max_persisted_seqno = db.max_persisted_seqno(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this replay is not the same as the STM log replay? Here we're applying batches which are already stored by the STM (in other words they're applied to the STM but not to the LSM).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, STM log replay gets us the Raft replicated entries of the volatile buffer that have not yet been persisted in the LSM manifest. This replay here applies those write batches on top so the opened database is caught up to the tip of the committed log.
src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc
Outdated
Show resolved
Hide resolved
f14503f to
cfd5991
Compare
CI test resultstest results on build#78915
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.
| .row | ||
| = write_batch_row{.key = "key_before_reset", .value = iobuf::from("value_before_reset"),}, |
Copilot
AI
Jan 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line formatting breaks the designated initializer on a single line by placing the closing brace and comma separately. This should be reformatted to either fit on one line or break consistently across multiple lines for better readability.
| .row | |
| = write_batch_row{.key = "key_before_reset", .value = iobuf::from("value_before_reset"),}, | |
| .row = write_batch_row{ | |
| .key = "key_before_reset", | |
| .value = iobuf::from("value_before_reset"), | |
| }, |
| volatile_row{ | ||
| .seqno = lsm::sequence_number{100}, | ||
| .row | ||
| = write_batch_row{.key = "reset_key", .value = iobuf::from("reset_value"),}, |
Copilot
AI
Jan 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line formatting breaks the designated initializer on a single line by placing the closing brace and comma separately. This should be reformatted to either fit on one line or break consistently across multiple lines for better readability.
| volatile_row{ | ||
| .seqno = lsm::sequence_number{100}, | ||
| .row | ||
| = write_batch_row{.key = "reset_key", .value = iobuf::from("reset_value"),}, |
Copilot
AI
Jan 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line formatting breaks the designated initializer on a single line by placing the closing brace and comma separately. This should be reformatted to either fit on one line or break consistently across multiple lines for better readability.
Plumbs a new struct into memory persistence to allow tests to fail operations. In the future this can be used to inject delays, randomized failures, etc.
cfd5991 to
ab8c3cd
Compare
|
Force push to rebase on dev |
In case of errors in the metadata persistence layer, flush would previously hang until success. This adds an optional timeout for this case, which will be useful for an upcoming metadata persistence layer that uses Raft.
Introduces a wrapper around cloud_persistence that replicates and serves the database manifest from Raft (while maintaining it in object storage as well). A subsequent commit will introduce usage of this to maintain a database across replicas of a Raft group.
Introduces a class that wraps lsm::database with the appropriate object storage classes to be consistent across replica leaders (i.e. different instances see a consistent view of the database upon leadership changes).
ab8c3cd to
1398d70
Compare
|
|
||
| ss::future<std::optional<iobuf>> | ||
| read_manifest(lsm::internal::database_epoch max_epoch) override { | ||
| _as.check(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw I think this is kind of useless because we never call close until after all callers of this method have returned. Doesn't need to block this PR, we can fix in a followup (I want to integrate @nvartolomei's context thing into the LSM)
| // There is no persisted manifest. | ||
| co_return std::nullopt; | ||
| } | ||
| co_return _stm->state().persisted_manifest->buf.copy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably could share this out too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will put out a follow-up (and some shares missed in the other PR)
Builds on top of the new LSM STM and introduces a new
replicated_databaseabstraction that is intended to be opened only on leaders. It is anlsm::databasewhose data and metadata storage are backed by object storage for recoverability, and whose manifest is replicated through Raft.After opening the database from the serialized manifest in the STM, leaders are expected to apply the remaining write batches from the volatile buffer before serving subsequence requests. This expectation is encoded in the
replicated_database::open()call.Backports Required
Release Notes